package com.heima.schedule.service.impl;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.heima.common.redis.CacheService;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.schedule.dtos.TaskDto;
import com.heima.model.schedule.pojos.Taskinfo;
import com.heima.model.schedule.pojos.TaskinfoLogs;
import com.heima.schedule.mapper.TaskinfoLogsMapper;
import com.heima.schedule.mapper.TaskinfoMapper;
import com.heima.schedule.service.TaskinfoService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * @author jack
 * @data 2023 9:16
 */
@Service
@Transactional
@Slf4j
public class TaskinfoServiceImpl extends ServiceImpl<TaskinfoMapper, Taskinfo> implements TaskinfoService {
    @Autowired
    private TaskinfoLogsMapper logsMapper;
    @Autowired
    private CacheService cacheService;

    /**
     * 添加任务
     *
     * @param taskDto
     * @return
     */
    @Override
    public ResponseResult addTask(TaskDto taskDto) {
        //1. 将任务信息保存到DB
        addTaskToDB(taskDto);

        //2. 根据任务执行时间选择合适的redis数据结构来存储任务信息
        addTaskToCaChe(taskDto);
        return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);
    }


    /**
     * 根据任务的类型和优先级从redis的list中拉取任务
     *
     * @param taskType
     * @param priority
     * @return
     */
    @Override
    public ResponseResult pullTask(Integer taskType, Integer priority) {
        String key = "topic_" + taskType + "_" + priority;
        String taskJSON = cacheService.lRightPop(key);
        if (!StringUtils.isEmpty(taskJSON)){
            //将任务数据从数据库中移除，同时修改任务日志状态：1=EXECUTED
            TaskDto taskDto = JSON.parseObject(taskJSON, TaskDto.class);
            //删除taskinfo
            removeById(taskDto.getTaskId());
            //修改taskinfologs
            TaskinfoLogs logs = logsMapper.selectById(taskDto.getTaskId());
            logs.setStatus(1);
            logsMapper.updateById(logs);
            byte[] parameters = taskDto.getParameters();
            String idStr = new String(parameters, StandardCharsets.UTF_8);
            return ResponseResult.okResult(idStr);
        }else {
            return ResponseResult.errorResult(AppHttpCodeEnum.DATA_NOT_EXIST);
        }
    }

    //将任务信息保存到redis
    private void addTaskToCaChe(TaskDto taskDto) {
        //1. 拿到当前系统时间, 任务的执行时间， 预设时间【当前系统时间 + 5分钟】
        long now = System.currentTimeMillis();
        long executeTime = taskDto.getExecuteTime();
        long futureTime = now + (1000L * 60 * 5);

        //2. 如果任务的执行时间 <= 当前系统时间,将任务信息存入：list
        if (executeTime <= now) {
            cacheService.lLeftPush("topic_" + taskDto.getTaskType() + "_" + taskDto.getPriority(), JSON.toJSONString(taskDto));  //topic_1001_1
        } else if (executeTime <= futureTime) {
            //3. 如果 预设时间   >= 任务的执行时间 > 当前系统时间,将任务信息存入：zset
            cacheService.zAdd("future_" + taskDto.getTaskType() + "_" + taskDto.getPriority(), JSON.toJSONString(taskDto), taskDto.getExecuteTime());  //future_1001_1
        }
    }

    //将任务信息保存到DB
    private void addTaskToDB(TaskDto taskDto) {
        //1. 将任务保存到taskinfo表中
        Taskinfo taskinfo = new Taskinfo();
        BeanUtils.copyProperties(taskDto, taskinfo);
        taskinfo.setExecuteTime(new Date(taskDto.getExecuteTime()));
        save(taskinfo);
        taskDto.setTaskId(taskinfo.getTaskId());

        //2. 将任务保存到taskinfoLogs表中
        TaskinfoLogs logs = new TaskinfoLogs();
        BeanUtils.copyProperties(taskinfo, logs);
        logs.setVersion(1);
        logs.setStatus(0);
        logsMapper.insert(logs);
    }

    /*
        任务1： 将ZSet数据定时刷新到List中
        执行频次：每分钟执行一次
     */
    @Scheduled(cron = "0 0/1 * * * ?")
    public void refreshZSetToList(){
        //1. 获取ZSet中的所有数据
        Set<String> ZSetKeys = cacheService.scan("future*");        //[future_1001_1]
        if (ZSetKeys != null) {
            log.info("=============开始刷新ZSet数据到List============");
            for (String zSetKey : ZSetKeys) {                        //future_1001_1
                //2. 将执行时间到期的任务从ZSet中取出来，放到list
                Set<String> taskDtoJSONs = cacheService.zRangeByScore(zSetKey, 0, System.currentTimeMillis());
                String[] split = zSetKey.split("future");
                String topicKey = "topic" + split[1];               // topic_1001_1
                cacheService.refreshWithPipeline(zSetKey, topicKey, taskDtoJSONs);
            }
            log.info("=============刷新ZSet数据到List结束============");
        }
    }


    /*
        任务2： 将数据库任务数据定时刷新到redis中
        执行频次：每5分钟执行一次
     */
    @Scheduled(cron = "0 0/5 * * * ?")
    public void refreshDBToRedis(){
        //1. 查询出数据库中符合条件的任务： 执行时间 <= 当前时间 + 5分钟
        Date date = new Date(System.currentTimeMillis() + (1000 * 60 * 5));
        List<Taskinfo> taskinfoList = list(Wrappers.<Taskinfo>lambdaQuery().le(Taskinfo::getExecuteTime, date));

        if (taskinfoList != null) {
            //2. 清空缓存ZSet , List
            Set<String> ZSetKeys = cacheService.scan("future*");
            cacheService.delete(ZSetKeys);
            Set<String> listKeys = cacheService.scan("topic*");
            cacheService.delete(listKeys);

            //3. 将数据库查出来的任务存入缓存
            List<TaskDto> taskDtoList = taskinfoList.stream().map(taskinfo -> {
                TaskDto dto = new TaskDto();
                BeanUtils.copyProperties(taskinfo, dto);
                dto.setExecuteTime(taskinfo.getExecuteTime().getTime());
                return dto;
            }).collect(Collectors.toList());
            for (TaskDto taskDto : taskDtoList) {
                addTaskToCaChe(taskDto);
            }
        }
    }
}
